#include "access/pushpage.h"

#include "postgres.h"
#include "utils/elog.h"
#include "utils/guc.h"
#include "utils/builtins.h"
#include "libpq-fe.h"
#include "lib/stringinfo.h"
#include "access/xlog.h"
#include "utils/pg_lsn.h"


clock_t start_time;
XLogRecPtr	PushPtr = 0;
XLogRecPtr  ApplyLsn = 0;
XLogRecPtr PrePushPtr = 0;
CheckPoint GlobalCheckPoint;
uint8 GlobalState;


#define maxconnlen  1024
static PGconn *pushconn = NULL;
static PGconn *connToPushStandby = NULL;
pid_t startupPid = 0;
static redisContext *redisconn = NULL;
static bool ConnectRedis() {
	redisconn = redisConnect("127.0.0.1", 6379);
	if (redisconn == NULL) {
		ereport(ERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("connect to redis failed")));
		return false;
	}
	if (redisconn->err) {
		ereport(ERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("connect to redis failed: %s",redisconn->errstr)));
		redisconn = NULL;
		return false;
	}
	char*redis_password = "VlJi7uBV";
	redisReply *reply = (redisReply *)redisCommand(redisconn, "AUTH %s", redis_password);
	if (reply->type == REDIS_REPLY_ERROR) {
		ereport(ERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("connect to redis passwd failed: %s",redisconn->errstr)));
		redisconn = NULL;
		return false;
	}
	return true;
}

bool pushRedisList(const char*str) {
	if (redisconn == NULL) {
		if (ConnectRedis() == false) {
			return false;
		}
	}
	redisReply* r = (redisReply*)redisCommand(redisconn, str);
	if (NULL == r) {
    	redisFree(redisconn);
		redisconn = NULL;
    	return false;
  	}
	if (!(r->type == REDIS_REPLY_STATUS && strcasecmp(r->str,"OK") == 0) && (r->type!=REDIS_REPLY_INTEGER)) {
    	ereport(ERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("connect to redis failed1: %s",r->str)));
    	freeReplyObject(r);
		redisFree(redisconn);
		redisconn = NULL;
    	return false;
	}
	freeReplyObject(r);
	return true;
}


static bool ConnectPushStandbyDB() {
	char	   *err;
	const char *keys[] = {"dbname","user","password","host","port",NULL};
	const char *vals[] = {"postgres","repl","123456","127.0.0.1","15431",NULL};
	connToPushStandby = PQconnectdbParams(keys, vals, false);
	if (PQstatus(connToPushStandby) == CONNECTION_BAD)
	{
		err = pchomp(PQerrorMessage(connToPushStandby));
		ereport(ERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("push standby could not connect to the push standby server: %s", err)));
		return false;
	}
	return true;
	
}


static bool ConnectPrimaryDB() {
	char	   *err;
	char		conninfo[maxconnlen];
	const char *keys[] = {"dbname","user","password","host","port",NULL};
	const char *vals[] = {"postgres","repl","123456","127.0.0.1","15432",NULL};
	strlcpy(conninfo, (char *) PrimaryConnInfo, maxconnlen);
	/* Establish the connection to the primary for query Min Lsn*/
	/*
	 * We use the expand_dbname parameter to process the connection string (or
	 * URI), and pass some extra options.
	 */
	/* Note we do not want libpq to re-expand the dbname parameter */
	pushconn = PQconnectdbParams(keys, vals, true);
	if (PQstatus(pushconn) == CONNECTION_BAD)
	{
		err = pchomp(PQerrorMessage(pushconn));
		ereport(ERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("push standby could not connect to the primary server: %s", err)));
		return false;
	}
	return true;	
}

XLogRecPtr QueryPushLsn() 
{
	StringInfoData cmd;
	XLogRecPtr 		replylsn = InvalidXLogRecPtr;
	char       *replyptr;
	initStringInfo(&cmd);
	appendStringInfoString(&cmd,"select pg_last_wal_replay_lsn()");
	replylsn = InvalidXLogRecPtr;
	if (connToPushStandby == NULL) {
		if (ConnectPushStandbyDB() == false) {
			return InvalidXLogRecPtr;
		} 
	} 
	PGresult   *pgres = NULL;
	pgres = PQexec(connToPushStandby, cmd.data);
	if (PQresultStatus(pgres) == PGRES_TUPLES_OK && PQntuples(pgres) == 1) {
		replyptr = PQgetvalue(pgres, 0, 0);
		bool flag;
		replylsn = pg_lsn_in_internal(replyptr,&flag); 
		
	}
	PQfinish(connToPushStandby);
	connToPushStandby = NULL;
	PQclear(pgres);
	return replylsn;	
	
}

XLogRecPtr QueryPushChkpointLsn() 
{
	ControlFileData *ControlFile;
	int			fd;
	char		ControlFilePath[MAXPGPATH];
	pg_crc32c	crc;
	int			r;
	
	ControlFile = palloc(sizeof(ControlFileData));

	fd = BasicOpenFile(XLOG_CONTROL_FILE,
					   O_RDONLY | PG_BINARY);
	if (fd < 0)
		ereport(PANIC,
				(errcode_for_file_access(),
				 errmsg("could not open file \"%s\": %m",
						XLOG_CONTROL_FILE)));

	r = read(fd, ControlFile, sizeof(ControlFileData));
	if (r != sizeof(ControlFileData))
	{
		if (r < 0)
			ereport(PANIC,
					(errcode_for_file_access(),
					 errmsg("could not read file \"%s\": %m",
							XLOG_CONTROL_FILE)));
		else
			ereport(PANIC,
					(errcode(ERRCODE_DATA_CORRUPTED),
					 errmsg("could not read file \"%s\": read %d of %zu",
							XLOG_CONTROL_FILE, r, sizeof(ControlFileData))));
	}
	close(fd);

	return ControlFile->checkPoint;
}

XLogRecPtr QueryMinLsn(XLogRecPtr lsn) 
{
	StringInfoData cmd;
	XLogRecPtr 		replylsn;
	PGresult   *pgres = NULL;
	char	   *appname;
	char       *state;
	char       *syncstate;
	char       *replyptr;
	replylsn = InvalidXLogRecPtr;
	if (pushconn == NULL) {
		if (ConnectPrimaryDB() == false) {
			return InvalidXLogRecPtr;
		} 
	} 

	initStringInfo(&cmd);
	appendStringInfoString(&cmd, "SELECT t.application_name, t.replay_lsn, t.state, t.sync_state FROM pg_catalog.pg_stat_replication t WHERE t.application_name <> \'");
	appendStringInfoString(&cmd, "pushstandby");
	appendStringInfoString(&cmd, "\' order by t.replay_lsn limit 1");

	pgres = PQexec(pushconn, cmd.data);
	if (PQresultStatus(pgres) == PGRES_TUPLES_OK && PQntuples(pgres) == 1) {
		appname = PQgetvalue(pgres, 0, 0);
		replyptr = PQgetvalue(pgres, 0, 1);
		bool flag;
		replylsn = pg_lsn_in_internal(replyptr,&flag); 
		//replylsn = atol(replyptr);
		state = PQgetvalue(pgres, 0, 2);
		syncstate = PQgetvalue(pgres, 0, 3);
	}
	else if (PQresultStatus(pgres) == PGRES_BAD_RESPONSE || 
		PQresultStatus(pgres) == PGRES_NONFATAL_ERROR ||
		PQresultStatus(pgres) == PGRES_FATAL_ERROR)
	{
		PQfinish(pushconn);
		pushconn = NULL;
		PQclear(pgres);
		return InvalidXLogRecPtr;
	}
	//elog(LOG,"appnamelsn: %x: replylsn %x",lsn,replylsn);
	if (lsn !=InvalidXLogRecPtr && lsn < replylsn||replylsn == InvalidXLogRecPtr) {
		replylsn = lsn;
	}
	PQclear(pgres);
	return replylsn;
}

Queue DirtyPq = {
	NULL,
	NULL
};

void QueuePush(QDataType x)
{
	Queue* pq = &DirtyPq;
    QNode* newnode = (QNode*)malloc(sizeof(QNode));
    newnode->next = NULL;
    memcpy(&newnode->data,&x,sizeof(x));
    if (pq->tail == NULL)
    {
        pq->head = pq->tail = newnode;
    }
    else
    {
        pq->tail->next = newnode;
        pq->tail = newnode;
    }
}
  
//出队列
QDataType QueuePop()
{
    Queue* pq = &DirtyPq;
	QDataType data;
	memcpy(&data,&pq->head->data,sizeof(QDataType));
    if (pq->head->next == NULL)
    {
        free(pq->head);
        pq->head = pq->tail = NULL;
    }
    else
    {
        QNode* next = pq->head->next;
        free(pq->head);
        pq->head = next;
    }
	return data;
}

bool QueueEmpty()
{
	Queue* pq = &DirtyPq; 
    return pq->head == NULL;
}

XLogRecPtr QueueHeadEndLsn() 
{
	Queue* pq = &DirtyPq;
	return pq->head->data.endlsn;
}



